Java 并发基础(三)

ThreadLocalRandom 生成随机数

在 java 7 以及到现在,java.util.Random 都是使用比较广泛的随机数生成工具类, 而且 java.lang.Math 中的随机数生成也使用的是 java.util.Random 的实例。

1
2
3
4
5
6
7
8
public static void main(String[] args) {
// 创建一个默认种子的随机数生成器
Random random = new Random();
for (int i = 0; i < 10; i++) {
// 输出10个在0~5(包含0,不包含5)之间的随机数
System.out.println(random.nextInt(5));
}
}

随机数的生成需要一个默认的种子,这个种子其实是一个 long 类型的数字 , 你可以在创建 Random 对象时通过构造函数指定,如果不指定则在默认构造函数内部生成一个默认的值。有了默认的种子后,如何生成随机数呢?

1
2
3
4
5
6
7
8
9
public int nextInt(int bound) {
// (3) 参数检查
if (bound <= 0)
throw new IllegalArgumentException(BadBound);
// (4) 根据老的种子生成新的种子
int r = next(31);
// (5) 根据新的种子计算随机数 ...
return r;
}

新的随机数的生成需要两个步骤 :

  • 首先根据老的种子生成新的种子。
  • 然后使用新的种子来计算新的随机数。

其中步骤(4)我们可以抽象为 seed=f(seed), 其中 f 是一个固定的函数,比如 seed= f(seed)=a*seed+b ; 步骤(5)也可以抽象为 g(seed,bound),其中 g 是一个固定的函数,比 如g(seed,bound)=(int)((bound* (long)seed) >> 31)。在单线程情况下每次调用 nextInt 都是根 据老的种子计算出新的种子,这是可以保证随机数产生的随机性的。但是在多线程下多个 线程可能都拿同一个老的种子去执行步骤(4)以计算新的种子,这会导致多个线程产生 的新种子是一样的,由于步骤(5)的算法是固定的,所以会导致多个线程产生相同的随 机值,这并不是我们想要的。所以步骤(4)要保证原子性,也就是说当多个线程根据同 一个老种子计算新种子时,第一个线程的新种子被计算出来后,第二个线程要丢弃自己 老的种子,而使用第一个线程的新种子来计算自己的新种子,依此类推,只有保证了这 个,才能保证在多线程下产生的随机数是随机的。Random 函数使用一个原子变量达到了 这个效果,在创建 Random 对象时初始化的种子就被保存到了种子原子变量里面,下面看 next() 的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected int next(int bits) {
long oldseed, nextseed;
AtomicLong seed = this.seed;
do {
// (6) 获取当前原子变量种子的值。
oldseed = seed.get();
// (7) 根据当前种子值计算新的种子。
nextseed = (oldseed * multiplier + addend) & mask;
// (8) 使用 CAS 操作,它使用新的种子去更新老的种子,在多线程下可能多个线程都同时执行到了代码(6),
// 那么可能多个线程拿到的当前种子的值是同一个,然后执行 步骤(7)计算的新种子也都是一样的。
// 但是步骤 (8)的 CAS 操作会保证只有一个线程可以更新老的种子为新的,失败的线程会通过循环重新
// 获取更新后的种子作为当前种子去 计算老的种子,这就解决了上面提到的问题,保证了随机数的随机性。
} while(!seed.compareAndSet(oldseed, nextseed));
// (9) 使用固定算法根据新的种子计算随机数。
return (int)(nextseed >>> (48 - bits));
}

所以,每个 Random 实例里面都有一个原子性的种子变量用来记录当前的种子值, 当要生成新的随机数时需要根据当前种子计算新的种子并更新回原子变量。在多线程 下使用单个 Random 实例生成随机数时,当多个线程同时计算随机数来计算新的种子 时,多个线程会竞争同一个原子变量的更新操作,由于原子变量的更新是 CAS 操作,同 时只有一个线程会成功,所以会造成大量线程进行自旋重试,这会降低并发性能,因此 ThreadLocalRandom 应运而生。下面代码展示其基本使用:

1
2
3
4
5
6
public static void main(String[] args) {
ThreadLocalRandom random = ThreadLocalRandom.current();
for (int i = 0; i < 10; i++) {
System.out.println(random.nextInt(5));
}
}

ThreadLocalRandom 原理类似于 ThreadLocal。ThreadLocal 通过让每一个线程复制一份变量,使得在每个线程对变 量进行操作时实际是操作自己本地内存里面的副本,从而避免了对共享变量进行同步。实际上 ThreadLocalRandom 的实现也是这个原理,Random 的缺点是多个线程会使用同一个 原子性种子变量,从而导致对原子变量更新的竞争。如果每个线程都维护一个种子变量,则每个线程生成随机数时都根据自己老的 种子计算新的种子,并使用新种子更新老的种子,再根据新种子计算随机数,就不会存在 竞争问题了,这会大大提高并发性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ThreadLocalRandom extends Random {
// Unsafe 机制
private static final sun.misc.Unsafe UNSAFE; private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
try {
// 获取unsafe实例
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
// 获取Thread类里面threadLocalRandomSeed变量在Thread实例里面的偏移量
SEED = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomSeed"));
// 获取Thread类里面threadLocalRandomProbe变量在Thread实例里面的偏移量
PROBE = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomProbe"));
// 获取Thread类里面threadLocalRandomSecondarySeed变量在Thread实例里面的偏移量,这个值在后面讲解LongAdder时会用到
SECONDARY = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
} catch (Exception e) {
throw new Error(e);
}
}

// ThreadLocalRandom current() 方法
static final ThreadLocalRandom instance = new ThreadLocalRandom();
public static ThreadLocalRandom current() {
// 如果当前线程中 threadLocalRandomProbe 的变量值为 0(默 认情况下线程的这个变量值为 0)
// 则说明当前线程是第一次调用 ThreadLocalRandom 的 current 方法,那么就需要调用 localInit 方法计算当前线程的初始化种子变量。
// 这里为了延迟初始化,在不需要使用随机数功能时就不初始化 Thread 类中的种子变量,这是一种优化。
if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0) {
// 首先根据 probeGenerator 计算当前线程中 threadLocalRandomProbe 的初始 化值,然后根据 seeder 计算当前线程的初始化种子,而后把这两个变量设置到当前线程。
localInit();
}
// 返回 ThreadLocalRandom 的实例,多个线程返回的是同一个 ThreadLocalRandom 实例。
return instance;
}
static final void localInit() {
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread();
UNSAFE.putLong(t, SEED, seed);
UNSAFE.putInt(t, PROBE, probe);
}
}

ThreadLocalRandom 类继承了 Random 类并重写了 nextInt 方 法,在 ThreadLocalRandom 类中并没有使用继承自 Random 类的原子性种子变量。在 ThreadLocalRandom 中并没有存放具体的种子,具体的种子存放在具体的调用线程的 threadLocalRandomSeed 变量里面(通过 Unsafe 进行操作)。ThreadLocalRandom 类似于 ThreadLocal 类,就是个工 具类。当线程调用 ThreadLocalRandom 的 current 方法时,ThreadLocalRandom 负责初始化 调用线程的 threadLocalRandomSeed 变量,也就是初始化种子。

当 调 用 ThreadLocalRandom 的 nextInt 方 法 时, 实 际 上 是 获 取 当 前 线 程 的 threadLocalRandomSeed 变量作为当前种子来计算新的种子,然后更新新的种子到当前线 程的 threadLocalRandomSeed 变量,而后再根据新种子并使用具体算法计算随机数。这里 需要注意的是,threadLocalRandomSeed 变量就是 Thread 类里面的一个普通 long 变量,它 并不是原子性变量。其实道理很简单,因为这个变量是线程级别的,所以根本不需要使用 原子性变量,如果你还是不理解可以思考下 ThreadLocal 的原理。其中 seeder 和 probeGenerator 是两个原子性变量,在初始化调用线程的种子和探针变 量时会用到它们,每个线程只会使用一次。

另外,变量 instance 是 ThreadLocalRandom 的一个实例,该变量是 static 的。当多线 程通过 ThreadLocalRandom 的 current 方法获取 ThreadLocalRandom 的实例时,其实获取 的是同一个实例。但是由于具体的种子是存放在线程里面的,所以在 ThreadLocalRandom 的实例里面只包含与线程无关的通用算法,所以它是线程安全的。

计算当前线程的下一个随机数的方法 nextInt 使用的就是父类 Random 的方法,我们重点看下 nextSeed() 方法。

1
2
3
4
5
final long nextSeed() {
Thread t; long r; //
UNSAFE.putLong(t = Thread.currentThread(), SEED, r = UNSAFE.getLong(t, SEED) + GAMMA);
return r;
}

在如上代码中,首先使用r = UNSAFE.getLong(t, SEED) 获取当前线程中 threadLocalRandomSeed 变量的值,然后在种子的基础上累加 GAMMA 值作为新种子,而 后使用 UNSAFE 的 putLong 方法把新种子放入当前线程的 threadLocalRandomSeed 变量中。


原子操作相关类

JUC 包提供了一系列的原子性操作类,这些类都是使用非阻塞算法 CAS 实现的,相比使用锁实现原子性操作这在性能上有很大提高。由于原子性操作类的原理都大致相同, 所以这里只介绍最简单的 AtomicLong 类的实现原理以及 JDK 8 中新增的 LongAdder 和 LongAccumulator 类的原理。


AtomicLong

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class AtomicLong extends Number implements java.io.Serializable {
// 获取Unsafe实例,之所以能通过 Unsafe.getUnsafe()方法获取到 Unsafe 类的实例
// 其实这是因为 AtomicLong 类就是通过 BootStarp 类加载器进行加载的。
private static final Unsafe U = Unsafe.getUnsafe();
// 存放变量value的偏移量
private static final long VALUE = U.objectFieldOffset(AtomicLong.class, "value");
// 被声明为 volatile 的,这是为了在多线程下保证内存可见性, value 是具体存放计数的变量。
private volatile long value;

// 判断JVM是否支持Long类型无锁CAS
static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
private static native boolean VMSupportsCS8();

public AtomicLong(long initialValue) {
value = initialValue;
}

// ...

// 调用 unsafe方法,原子性设置value值为原始值+1,返回值为递增后的值
public final long incrementAndGet() {
return U.getAndAddLong(this, VALUE, 1L) + 1L;
}

// 调用 unsafe方法,原子性设置value值为原始值-1,返回值为递减之后的值
public final long decrementAndGet() {
return U.getAndAddLong(this, VALUE, -1L) - 1L;
}

// 调用 unsafe 方法,原子性设置value值为原始值+1,返回值为原始值
public final long getAndIncrement() {
return U.getAndAddLong(this, VALUE, 1L);
}

// 调用 unsafe 方法,原子性设置value值为原始值-1,返回值为原始值
public final long getAndDecrement() {
return U.getAndAddLong(this, VALUE, -1L);
}

// cas 操作,在内部还是调用了 unsafe.compareAndSwapLong 方法
public final boolean compareAndSet(long expectedValue, long newValue) {
return U.compareAndSetLong(this, VALUE, expectedValue, newValue);
}

// ...
}

AtomicLong 的应用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Demo {
private static final AtomicLong atomicLong = new AtomicLong();
private static final Integer[] arrayOne = new Integer[]{0,1,2,3,0,5,6,0,56,0};
private static final Integer[] arrayTwo = new Integer[]{10,1,2,3,0,5,6,0,56,0};

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (Integer n : arrayOne) {
if (n == 0) {
atomicLong.incrementAndGet();
}
}
}, "t1");
Thread t2 = new Thread(() -> {
for (Integer n : arrayTwo) {
if (n == 0) {
atomicLong.incrementAndGet();
}
}
}, "t2");

t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count 0:" + atomicLong.get()); // 7
}
}

在没有原子类的情况下,实现计数器需要使用一定的同步措施,比如使用 synchronized 关键字等,但是这些都是阻塞算法,对性能有一定损耗,而本章介绍的这些原子操作类都使用 CAS 非阻塞算法,性能更好。但是在高并发情况下 AtomicLong 还会存在性能问题。JDK 8 提供了一个在高并发下性能更好的 LongAdder 类。


LongAdder

AtomicLong 通过 CAS 提供了非阻塞的原子性操作,相比使用阻塞算法的 同步器来说它的性能已经很好了,但是 JDK 开发组并不满足于此。使用 AtomicLong 时, 在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只有一个线程的 CAS 操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试 CAS 的操作,而这会白白浪费 CPU 资源。

既然 AtomicLong 的性能瓶颈是由于过多线程同时去竞争一个变量的 更新而产生的,那么如果把一个变量分解为多个变量,让同样多的线程去竞争多个资源, 是不是就解决了性能问题?是的,LongAdder 就是这个思路。

使用 LongAdder 时,则是在内部维护多个 Cell 变量,每个 Cell 里面 有一个初始值为 0 的 long 型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了争夺共享资源的并发量。另外,多个线程在争夺同一 个 Cell 原子变量时如果失败了,它并不是在当前 Cell 变量上一直自旋 CAS 重试,而是尝试在其他 Cell 的变量上进行 CAS 尝试,这个改变增加了当前线程重试 CAS 成功的可能性。 最后,在获取 LongAdder 当前值时,是把所有 Cell 变量的 value 值累加后再加上 base 返回的。

LongAdder 维护了一个延迟初始化的原子性更新数组(默认情况下 Cell 数组是 null) 和一个基值变量 base。由于 Cells 占用的内存是相对比较大的,所以一开始并不创建它, 而是在需要时创建,也就是惰性加载。当一开始判断 Cell 数组是 null 并且并发线程较少时,所有的累加操作都是对 base 变量进行的。保持 Cell 数组的大小为 2 的 N 次方,在初始化时 Cell 数组中的 Cell 元素个数为 2,数组里面的变量实体是 Cell 类型。Cell 类型是 AtomicLong 的一个改进,用来减少缓存的争用 , 也就是解决伪共享问题。

对于大多数孤立的多个原子操作进行字节填充是浪费的,因为原子性操作都是无规律 地分散在内存中的(也就是说多个原子性变量的内存地址是不连续的),多个原子变量被 放入同一个缓存行的可能性很小。但是原子性数组元素的内存地址是连续的,所以数组内 的多个元素能经常共享缓存行,因此这里使用 @sun.misc.Contended 注解对 Cell 类进行字 节填充,这防止了数组中多个元素共享一个缓存行,在性能上是一个提升。

为了解决高并发下多线程对一个变量 CAS 争夺失败后进行自旋而造成的降低并发 性能问题,LongAdder 在内部维护多个 Cell 元素(一个动态的 Cell 数组)来分担对单 个变量进行争夺的开销。下面围绕以下话题从源码角度来分析 LongAdder 的实现。

  1. LongAdder 的结构是怎样的?
  2. 如何保证线程操作被分配的 Cell 元素的原子性?
  3. 当前线程应该访问 Cell 数组里面的哪一个 Cell 元素?
  4. 如何初始化 Cell 数组?
  5. Cell 数组如何扩容?
  6. 线程访问分配的 Cell 元素有冲突后如何处理?

LongAdder 类继承自 Striped64 类,在 Striped64 内部维护着三个变量。

  • cells:cells 数组。
  • base:LongAdder 的真实值其实是 base 的值与 Cell 数组里面所有 Cell 元素中的 value 值的累加, base 是个基础值,默认为 0。
  • cellsBusy:用来实现自旋锁,状态值只有 0 和 1,当创建 Cell 元素, 扩容 Cell 数组或者初始化 Cell 数组时,使用 CAS 操作该变量来保证同时只有一个线程可 以进行其中之一的操作。

来看看 Cell 的构造。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 可以看到,Cell 的构造很简单,其内部维护一个被声明为 volatile 的变量,这里声明为 volatile 是
* 因为线程操作 value 变量时没有使用锁,为了保证变量的内存可见性这里将其声明为 volatile 的。
*
* 另外 cas 函数通过 CAS 操作,保证了当前线程更新时被分配的 Cell 元素中 value 值的原子性。
* Cell 类使用 @sun.misc.Contended 修饰是为了避免伪共享。 到这里我们回答了问题1和问题2。
*/
@jdk.internal.vm.annotation.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

再来看 LongAdder 其他一些关键方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* 累加所有 Cell 内部的 value 值后再累加 base。
* 由于计算总和时没有对 Cell 数组进行加锁,所以在累加过程中 可能有其他线程对 Cell 中的值进行了修改,也有
* 可能对数组进行了扩容,所以 sum 返回的值并不是非常精确的,其返回值并不是一个调用 sum 方法时的原子快照值。
*/
public long sum() {
Cell[] as = cells;
Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
}
}
}
return sum;
}
public long longValue() { // 等价于 sum
return sum();
}

/**
* 如下代码把 base 置为 0,如果 Cell 数组有元素,则元素值 被重置为 0。
*/
public void reset() {
Cell[] as = cells;
Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
a.value = 0L;
}
}
}
}

/**
* sum 的改造版本,如下代码在使用 sum 累加对应的 Cell 值后,把当前 Cell 的值重置为 0,base 重置为 0。
* 这样,当多线程调用该方法时会有问题, 比如考虑第一个调用线程清空 Cell 的值,则后一个线程调用时累加的都是 0 值。
*/
public long sumThenReset() {
Cell[] as = cells;
Cell a;
long sum = base;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
a.value=0L;
}
}
}
return sum;
}

final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

/**
* 代码(1) 首先看 cells 是否为 null,如果为 null 则当前在基础变量 base 上进行累加, 这时候就类似 AtomicLong 的操作。
* 如果 cells 不为 null 或者线程执行代码(1)的 CAS 操作失败了,则会去执行代码(2)。
* 代码(2)(3)决定当前线程应该访问 cells 数组里面的哪一个 Cell 元素,如果当前线程映射的元素存在则执行代码(4)。
* 使用 CAS 操作去更新分配的 Cell 元素的 value 值,如果 当前线程映射的元素不存在或者存在但是 CAS 操作失败则执行代码(5)。
* 其实将代码(2)(3)(4)合起来看就是获取当前线程应该访问的 cells 数组的 Cell 元素,然后进行 CAS 更新操作,只是在获取期间如果有些条件不满足则会跳转到代码(5)执行。
* 另外当前线程 应该访问 cells 数组的哪一个 Cell 元素是通过 getProbe() & m 进行计算的,其中 m 是当前 cells 数组元素个数 -1,
* getProbe() 则用于获取当前线程中变量 threadLocalRandomProbe 的 值,这个值一开始为 0,在代码(5)里面会对其进行初始化。
* 并且当前线程通过分配的 Cell 元素的 cas 函数来保证对 Cell 元素 value 值更新的原子性。
*/
public void add(long x) {
Cell[] as;
long b, v;
int m;
Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) { // (1)
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || // (2)
(a = as[getProbe() & m]) == null || // (3)
!(uncontended = a.cas(v = a.value, v + x))) { // (4)
longAccumulate(x, null, uncontended); // (5)
}
}
}


LongAccumulator

LongAdder类是LongAccumulator的一个特例,LongAccumulator比LongAdder的功能更强大。例如下面的构造函数,其中 accumulatorFunction 是一个双目运算器接口,其根据输入的两个参数返回一个计算值,identity 则是 LongAccumulator 累加器的初始值。

1
2
3
4
public LongAccumulator(LongBinaryOperator accumulatorFunction, long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
1
2
3
4
public interface LongBinaryOperator {
//根据两个参数计算并返回一个值
long applyAsLong(long left, long right);
}

上面提到,LongAdder 其实是 LongAccumulator 的一个特例,调用 LongAdder 就相当于使用下面的方式调用 LongAccumulator :

1
2
3
4
5
6
7
LongAdder adder = new LongAdder();
LongAccumulator accumulator = new LongAccumulator(new LongBinaryOperator() {
@Override
public long applyAsLong(long left, long right) {
return left + right;
}
}, 0);

LongAccumulator 相比于 LongAdder,可以为累加器提供非 0 的初始值,后者只能提 供默认的 0 值。另外,前者还可以指定累加规则,比如不进行累加而进行相乘,只需要在 构造 LongAccumulator 时传入自定义的双目运算器即可,后者则内置累加的规则。